package com.migrate.module.migrate;

import com.migrate.module.config.ApplicationContextUtil;
import com.migrate.module.domain.*;
import com.migrate.module.enumeration.BinlogType;
import com.migrate.module.enumeration.ConsumerStatus;
import com.migrate.module.enumeration.DBChannel;
import com.migrate.module.enumeration.OperateType;
import com.migrate.module.exception.BusinessException;
import com.migrate.module.mapper.migrate.EtlBinlogConsumeRecordMapper;
import com.migrate.module.service.MigrateService;
import com.migrate.module.util.MigrateCheckUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;

import java.text.SimpleDateFormat;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * 对数据合并，并写入存储
 *
 * @author zhonghuashishan
 */
@Slf4j
public class MergeBinlogWrite {
    /**
     * 最大发送数
     */
    private static final Integer MAX_SEND = 500;
    /**
     * 比对时间戳
     */
    private static final String CREATE_TIME_STAMP = "createTime";

    private static final String TIME_STAMP = "updateTime";
    /**
     * 分隔符
     */
    private static final String SPLIT_KEY = "&";
    /**
     *  存储需要同步的数据对象
     */
    private List<BinLog> binlogDataList;
    /**
     * 用于存储过滤最新的数据
     */
    private final Map<String,BinLog> binlogDataMap = new HashMap<>(2048);

    /**
     * 本次同步的返回对象
     */
    private ScrollInfo scrollInfo = new ScrollInfo();
    /**
     * 存储本次需要更新的消息对象信息
     */
   private List<EtlBinlogConsumeRecord> etlBinlogConsumeRecordList = new ArrayList<>();

    /**
     * 对同步的数据进行合并，转换
     * @param binlogData MySQL的binlog对象
     */
    public void mergeBinlog(BinlogData binlogData){
        List<Map<String, Object>> dataList = binlogData.getDataMap();
        if (CollectionUtils.isEmpty(dataList)){
            return;
        }
        String key = MergeConfig.getSingleKey(binlogData.getTableName());

        for (Map<String,Object> dataMap:dataList){
            // 每次都需要先加入到集合中，用于处理完后，批量更新
            etlBinlogConsumeRecordList.add(binlogData.getConsumeRecord());

            // 先获取这笔同步记录的唯一标识字段
            String mergeKey = dataMap.get(key) + SPLIT_KEY + binlogData.getTableName();
            // 验证是否在这批同步的数据当中，有相同的更新记录
            BinLog getBinlogData = binlogDataMap.get(mergeKey);
            if (!ObjectUtils.isEmpty(getBinlogData)){
                // 判断历史的记录的操作时间 是否大于本次同步的操作时间
                if (getBinlogData.getOperateTime().compareTo(binlogData.getOperateTime()) >0){
                    continue;
                }
            }
            // 将数据转换为单条log对象
            BinLog binLog = buildBinLog(binlogData, dataMap);
            binlogDataMap.put(mergeKey, binLog);
        }
    }

    /**
     * 全量的数据同步
     */
    public ScrollInfo load(List<Map<String, Object>> scrollList, RangeScroll rangeScroll){
        // 过滤掉非需要匹配的时间断的数据 如果过滤后数据为空,则直接返回。
        filterCreateTime(scrollList,rangeScroll);
        if (scrollList.size() == 0){
            return scrollInfo;
        }
        // 数据转换为增量的模型
        transformModel(scrollList,rangeScroll.getTableName());
        // 对数据和新库进行过滤
        filterBinlogAging();
        // 写入新库
        write(OperateType.ALL);
        return scrollInfo;
    }


    /**
     * 首先拿到当前页最大的滚动ID，然后在过滤掉不是查询的时间区域的数据，防止全部数据都被过滤掉
     * @param scrollList
     * @param rangeScroll
     */
    private void filterCreateTime(List<Map<String, Object>> scrollList, RangeScroll rangeScroll){
        String key = MergeConfig.getSingleKey(rangeScroll.getTableName());
        scrollInfo.setMaxScrollId(scrollList.get(scrollList.size() - 1).get(key).toString());

        Iterator<Map<String, Object>> iterator = scrollList.iterator();
        while(iterator.hasNext()){
            Map<String, Object> scrollMap = iterator.next();
            Date createTime = (Date)scrollMap.get(CREATE_TIME_STAMP);
            if (createTime.compareTo(rangeScroll.getStartTime()) < 0 || createTime.compareTo(rangeScroll.getEndTime()) > 0){
                iterator.remove();
            }
        }
        scrollInfo.setScrollSize(scrollList.size());
    }

    /**
     * 对 合并后的数据进行验证是否为过时数据
     */
    public void filterBinlogAging(){
        // 批量查询数据，是否存在新库，并返回匹配的数据集合
        Map<String, Map<String, Object>> respMap = batchQuery();
        // 开始核对数据是否已经存在库中，并验证谁的时间最新过滤失效数据
         for (Map.Entry<String,BinLog> entry:binlogDataMap.entrySet()){
             BinLog binLog = entry.getValue();
             //当前同步要处理的表名称
             String tableName = binLog.getTableName();
             // 判断同步的数据库中，是否存新库已存在
             if (!CollectionUtils.isEmpty(respMap) && respMap.containsKey(entry.getKey())){
                 //当前同步的这条记录
                 Map<String, Object> binLogMap = binLog.getDataMap();
                // 新库被查询到的记录
                 Map<String, Object> targetMap = respMap.get(entry.getKey());
                 // 处理同步的记录是否需要执行，如果同步的时间大于新库的时间，则代表需要更新,删除的数据不比对时间
                if (!MigrateCheckUtil.comparison(binLogMap,targetMap,tableName)
                        && !BinlogType.DELETE.getValue().equals(binLog.getOperateType())){
                    continue;
                }
             } else {
                 //数据在新库不存在，对多条数据的最后一条结果集的类型为update，需要更正为insert，如果是delete则略过
                 if (BinlogType.UPDATE.getValue().equals(binLog.getOperateType())){
                     binLog.setOperateType(BinlogType.INSERT.getValue());
                 }
                 if (BinlogType.DELETE.getValue().equals(binLog.getOperateType())){
                     continue;
                 }
             }
             // 将需要写入的数据添加到集合中
             binlogDataList.add(binLog);
         }

    }

    /**
     * 批量查询已存在新库的数据
     * @return 已存在新库的数据
     */
    private  Map<String, Map<String, Object>> batchQuery(){
        // 先获取本次迁移的全部唯一key
        List<String> keyStrList = new ArrayList<>(binlogDataMap.keySet());
        binlogDataList = new ArrayList<>(keyStrList.size());
        Map<String,List<String>> keyMap = new HashMap<>();
        //筛选按表为维度的集合
        for (String keyStr:keyStrList){
            String[] split = keyStr.split(SPLIT_KEY);
            List<String> keyList;
            String key = split[0];
            String tableName = split[1];
            if (keyMap.containsKey(tableName)){
                keyList = keyMap.get(tableName);
                keyList.add(key);
            } else {
                keyList = new ArrayList<>();
                keyList.add(key);
            }
            keyMap.put(tableName, keyList);
        }
        Map<String, Map<String, Object>> targetMap = new HashMap<>();
        MigrateService migrateService = ApplicationContextUtil.getBean(MigrateService.class);
        List<Map<String, Object>> targetAllList = new ArrayList<>();
        for (Map.Entry<String,List<String>> mapEntry:keyMap.entrySet()){
            String tableName = mapEntry.getKey();
            List<String> keyList = mapEntry.getValue();
            //数据切割，每次查询200条数据
            int limit = countStep(keyList.size());
            //切割成多个集合对象
            List<List<String>> splitList = Stream.iterate(0, n -> n + 1).limit(limit).parallel().map(
                    a -> keyList.stream().skip((long) a * MAX_SEND).limit(MAX_SEND).parallel().collect(Collectors.toList())).collect(Collectors.toList());
            // 分页查询数据
            for (List<String> strings : splitList)
            {
                List<Map<String, Object>> targetList = migrateService.findByIdentifiers(tableName, strings, DBChannel.CHANNEL_2.getValue());
                targetAllList.addAll(targetList);
            }

            String keyValue = MergeConfig.getSingleKey(tableName);
            for (Map<String, Object> target:targetAllList){
                String mapKey = target.get(keyValue)+"";
                targetMap.put(mapKey + SPLIT_KEY + tableName, target);
            }
        }
        return targetMap;
    }

    /**
     *
     * 对数据进行写入
     */
    public void write(OperateType operateType){
        // 先按表，将数据进行分组
        Map<String,List<BinLog>> binLogMap = binlogDataList.stream().collect(Collectors.groupingBy(BinLog::getTableName));
        //更新这个批次的数据同步完成  完成的数量不等于写入的数量，可能新库存在无需写入
        //scrollInfo.setScrollSize(binlogDataList.size());
        boolean isWrite = true;
        // 遍历不同写入表的集合对象
        for (Map.Entry<String,List<BinLog>> mapEntry:binLogMap.entrySet()){
            String tableName = mapEntry.getKey();
            List<BinLog> binLogList = mapEntry.getValue();

            MigrateService migrateService = ApplicationContextUtil.getBean(MigrateService.class);
            // 批量写入
            boolean isFlag = migrateService.migrateBat(tableName, binLogList);
            // 有一次更新失败，本批次的offset都不更新状态
            if (!isFlag){
                isWrite = false;
            }
        }
        // 批量更新offset的标志，如果更新过程中有一个批次是失败的，都不能更新掉本地同步的offset，待下次拉取的时候更新
        if (isWrite){
            if (OperateType.ADD==operateType){
                updateConsumeRecordStatus();
            }
        }else{
            //如果有更新失败todo 抛出异常，暂停任务，等排查出问题后继续进行
            throw new BusinessException("全量数据写入失败");
        }
    }

    /**
     *  更新消息队列的offset标志为已完成
     */
    private void updateConsumeRecordStatus(){
        EtlBinlogConsumeRecordMapper etlBinlogConsumeRecordMapper = ApplicationContextUtil.getBean(EtlBinlogConsumeRecordMapper.class);
        Map<Integer, List<EtlBinlogConsumeRecord>> integerListMap = etlBinlogConsumeRecordList.stream().collect(Collectors.groupingBy(EtlBinlogConsumeRecord::getQueueId));
        for (Map.Entry<Integer, List<EtlBinlogConsumeRecord>> mapEntry:integerListMap.entrySet()){
            Integer queueId = mapEntry.getKey();
            List<EtlBinlogConsumeRecord> etlBinlogConsumeRecordList = mapEntry.getValue();
            // 批量更新
            etlBinlogConsumeRecordMapper.batchUpdateConsumeRecordStatus(queueId, ConsumerStatus.CONSUME_SUCCESS.getValue(),etlBinlogConsumeRecordList);
        }
    }

    /**
     * 模型转换
     * @param scrollList 滚动List
     * @param tableName 表名
     */
    private void transformModel(List<Map<String, Object>> scrollList,String tableName){
        String key = MergeConfig.getSingleKey(tableName);
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
        try {
            for (Map<String, Object> scrollMap:scrollList){
                BinLog binLog = new BinLog();
                binLog.setOperateType(BinlogType.INSERT.getValue());
                binLog.setDataMap(scrollMap);
                binLog.setOperateTime(format.parse(scrollMap.get(TIME_STAMP) + "").getTime());
                binLog.setTableName(tableName);
                binLog.setKey(key);
                binlogDataMap.put(scrollMap.get(key) + SPLIT_KEY + tableName, binLog);
            }
        }catch (Exception e){
            log.error("模型转换出错", e);
        }
    }




    /**
     * 转换成单条存储的sql变更对象
     * @param binlogData MySQL的binlog对象
     * @param dataMap 单条sql的信息
     * @return binlog对象
     */
    private BinLog buildBinLog(BinlogData binlogData,Map<String, Object> dataMap){
        BinLog binLog = new BinLog();
        binLog.setDataMap(dataMap);
        binLog.setOperateTime(binlogData.getOperateTime());
        binLog.setOperateType(binlogData.getOperateType());
        binLog.setTableName(binlogData.getTableName());
        return binLog;
    }
    /**
     * 计算切分次数
     */
    private static Integer countStep(Integer size) {
        return (size + MAX_SEND - 1) / MAX_SEND;
    }


}
